# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# Targeted Impala insert tests

from __future__ import absolute_import, division, print_function
import os
import pytest
import re

from testdata.common import widetable
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.parametrize import UniqueDatabase
from tests.common.skip import (SkipIfFS, SkipIfLocal, SkipIfHive2,
    SkipIfNotHdfsMinicluster)
from tests.common.test_dimensions import (
    add_exec_option_dimension,
    create_exec_option_dimension,
    create_uncompressed_text_dimension,
    create_single_exec_option_dimension,
    is_supported_insert_format)
from tests.common.test_result_verifier import (
    QueryTestResult,
    parse_result_rows)
from tests.common.test_vector import HS2
from tests.verifiers.metric_verifier import MetricVerifier


PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd', 'lz4']


class TestInsertBase(ImpalaTestSuite):
  """All tests based on this class should run with 'unique_database' fixture."""

  @classmethod
  def add_test_dimensions(cls):
    super(TestInsertBase, cls).add_test_dimensions()
    if cls.exploration_strategy() == 'core':
      cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
          cluster_sizes=[0], disable_codegen_options=[True, False], batch_sizes=[0],
          sync_ddl=[0]))
      cls.ImpalaTestMatrix.add_dimension(
          create_uncompressed_text_dimension(cls.get_workload()))
    else:
      cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
          cluster_sizes=[0], disable_codegen_options=[True, False],
          batch_sizes=[0, 1, 16], sync_ddl=[0, 1]))
      add_exec_option_dimension(cls, "compression_codec", PARQUET_CODECS)
      # Insert is currently only supported for text and parquet
      # For parquet, we want to iterate through all the compression codecs
      # TODO: each column in parquet can have a different codec.  We could
      # test all the codecs in one table/file with some additional flags.
      cls.ImpalaTestMatrix.add_constraint(lambda v:
          v.get_value('table_format').file_format == 'parquet'
          or (v.get_value('table_format').file_format == 'text'
              and v.get_value('compression_codec') == 'none'))
      cls.ImpalaTestMatrix.add_constraint(lambda v:
          v.get_value('table_format').compression_codec == 'none')
      # Only test other batch sizes for uncompressed parquet to keep the execution time
      # within reasonable bounds.
      cls.ImpalaTestMatrix.add_constraint(lambda v:
          v.get_value('exec_option')['batch_size'] == 0
          or (v.get_value('table_format').file_format == 'parquet'
              and v.get_value('compression_codec') == 'none'))

  @classmethod
  def is_sync_ddl(cls, vector):
    return vector.get_value('exec_option')['sync_ddl'] == 1


class TestInsertQueries(TestInsertBase):

  @pytest.mark.execute_serially
  def test_insert_large_string(self, vector, unique_database):
    """Test handling of large strings in inserter and scanner."""
    if "-Xcheck:jni" in os.environ.get("LIBHDFS_OPTS", ""):
      pytest.skip("Test unreasonably slow with JNI checking.")
    table_name = unique_database + ".insert_largestring"

    self.client.set_configuration_option("mem_limit", "4gb")
    self.client.set_configuration_option("max_row_size", "257mb")
    file_format = vector.get_value('table_format').file_format
    if file_format == "parquet":
      stored_as = file_format
    else:
      assert file_format == "text"
      stored_as = "textfile"
    self.client.execute("""
        create table {0}
        stored as {1} as
        select repeat('AZ', 128 * 1024 * 1024) as s""".format(table_name, stored_as))

    # Make sure it produces correct result when materializing no tuples.
    result = self.client.execute("select count(*) from {0}".format(table_name))
    assert result.data == ["1"]

    # Make sure it got the length right.
    result = self.client.execute("select length(s) from {0}".format(table_name))
    assert result.data == [str(2 * 128 * 1024 * 1024)]

    # Spot-check the data.
    result = self.client.execute(
        "select substr(s, 200 * 1024 * 1024, 5) from {0}".format(table_name))
    assert result.data == ["ZAZAZ"]

    # IMPALA-7648: test that we gracefully fail when there is not enough memory
    # to fit the scanned string in memory.
    # IMPALA-9856: Disable result spooling for this query since it is intended to test
    # for OOM.
    self.client.set_configuration_option("spool_query_results", "0")
    self.client.set_configuration_option("mem_limit", "50M")
    try:
      self.client.execute("select s from {0}".format(table_name))
      assert False, "Expected query to fail"
    except Exception as e:
      assert "Memory limit exceeded" in str(e)

  @UniqueDatabase.parametrize(sync_ddl=True)
  # ABFS partition names cannot end in periods
  @SkipIfFS.file_or_folder_name_ends_with_period
  def test_insert(self, vector, unique_database):
    self.run_test_case('QueryTest/insert', vector, unique_database,
        multiple_impalad=self.is_sync_ddl(vector),
        test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
        .get_db_name_from_format(vector.get_value('table_format'))})

  @UniqueDatabase.parametrize(sync_ddl=True)
  @pytest.mark.execute_serially
  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
  def test_insert_mem_limit(self, vector, unique_database):
    self.run_test_case('QueryTest/insert-mem-limit', vector, unique_database,
        multiple_impalad=self.is_sync_ddl(vector),
        test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
        .get_db_name_from_format(vector.get_value('table_format'))})
    # IMPALA-7023: These queries can linger and use up memory, causing subsequent
    # tests to hit memory limits. Wait for some time to allow the query to
    # be reclaimed.
    verifiers = [MetricVerifier(i.service)
                 for i in ImpalaCluster.get_e2e_test_cluster().impalads]
    for v in verifiers:
      v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=180)

  @UniqueDatabase.parametrize(sync_ddl=True)
  def test_insert_overwrite(self, vector, unique_database):
    self.run_test_case('QueryTest/insert_overwrite', vector, unique_database,
        multiple_impalad=self.is_sync_ddl(vector),
        test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
        .get_db_name_from_format(vector.get_value('table_format'))})

  @UniqueDatabase.parametrize(sync_ddl=True)
  def test_insert_bad_expr(self, vector, unique_database):
    # The test currently relies on codegen being disabled to trigger an error in
    # the output expression of the table sink.
    if vector.get_value('exec_option')['disable_codegen']:
      self.run_test_case('QueryTest/insert_bad_expr', vector, unique_database,
          multiple_impalad=self.is_sync_ddl(vector),
          test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
          .get_db_name_from_format(vector.get_value('table_format'))})


class TestInsertQueriesWithDefaultFormat(TestInsertBase):

  @classmethod
  def add_test_dimensions(cls):
    super(TestInsertQueriesWithDefaultFormat, cls).add_test_dimensions()
    # Declare 'default_file_format' option and match it against table_format.
    # This is needed because CREATE queries inside .test files are often not CTAS,
    # not CREATE TABLE LIKE, and do not have STORED AS clause.
    add_exec_option_dimension(cls, "default_file_format", ['text', 'parquet'])
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        v.get_value('table_format').file_format == v.get_value('default_file_format'))

  @SkipIfHive2.acid
  @UniqueDatabase.parametrize(sync_ddl=True)
  def test_acid_insert(self, vector, unique_database):
    self.run_test_case('QueryTest/acid-insert', vector, unique_database,
        multiple_impalad=self.is_sync_ddl(vector))

  @SkipIfHive2.acid
  @UniqueDatabase.parametrize(sync_ddl=True)
  def test_acid_nonacid_insert(self, vector, unique_database):
    self.run_test_case('QueryTest/acid-nonacid-insert', vector, unique_database,
        multiple_impalad=self.is_sync_ddl(vector))

  @SkipIfHive2.acid
  @UniqueDatabase.parametrize(sync_ddl=True)
  def test_acid_insert_fail(self, vector, unique_database):
    self.run_test_case('QueryTest/acid-insert-fail', vector, unique_database,
        multiple_impalad=self.is_sync_ddl(vector))

  @UniqueDatabase.parametrize(sync_ddl=True)
  def test_insert_random_partition(self, vector, unique_database):
    """Regression test for IMPALA-402: partitioning by rand() leads to strange behaviour
    or crashes."""
    self.run_test_case('QueryTest/insert-random-partition', vector, unique_database,
        multiple_impalad=self.is_sync_ddl(vector))


class TestInsertWideTable(ImpalaTestSuite):
  @classmethod
  def add_test_dimensions(cls):
    super(TestInsertWideTable, cls).add_test_dimensions()

    # Only vary codegen
    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
        cluster_sizes=[0], disable_codegen_options=[True, False], batch_sizes=[0]))

    # Inserts only supported on text and parquet
    # TODO: Enable 'text'/codec once the compressed text writers are in.
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        v.get_value('table_format').file_format == 'parquet'
        or v.get_value('table_format').file_format == 'text')
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        v.get_value('table_format').compression_codec == 'none')

    # Don't run on core. This test is very slow (IMPALA-864) and we are unlikely to
    # regress here.
    if cls.exploration_strategy() != 'exhaustive':
      pytest.skip("Test only run in exhaustive exploration.")

  @SkipIfLocal.parquet_file_size
  def test_insert_wide_table(self, vector, unique_database):
    table_format = vector.get_value('table_format')

    # Text can't handle as many columns as Parquet (codegen takes forever)
    num_cols = 1000 if table_format.file_format == 'text' else 2000

    table_name = unique_database + ".insert_widetable"
    if vector.get_value('exec_option')['disable_codegen']:
      table_name += "_codegen_disabled"

    col_descs = widetable.get_columns(num_cols)
    create_stmt = "CREATE TABLE " + table_name + "(" + ','.join(col_descs) + ")"
    if vector.get_value('table_format').file_format == 'parquet':
      create_stmt += " stored as parquet"
    self.client.execute(create_stmt)

    # Get a single row of data
    col_vals = widetable.get_data(num_cols, 1, quote_strings=True)[0]
    insert_stmt = "INSERT INTO " + table_name + " VALUES(" + col_vals + ")"
    self.client.execute(insert_stmt)

    result = self.client.execute("select count(*) from " + table_name)
    assert result.data == ["1"]

    result = self.client.execute("select * from " + table_name)
    types = result.column_types
    labels = result.column_labels
    expected = QueryTestResult([col_vals], types, labels, order_matters=False)
    actual = QueryTestResult(
      parse_result_rows(result), types, labels, order_matters=False)
    assert expected == actual


class TestInsertPartKey(ImpalaTestSuite):
  """Regression test for IMPALA-875"""
  @classmethod
  def add_test_dimensions(cls):
    super(TestInsertPartKey, cls).add_test_dimensions()
    # Only run for a single table type
    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
        cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
        sync_ddl=[1]))

    cls.ImpalaTestMatrix.add_constraint(lambda v:
        (v.get_value('table_format').file_format == 'text'))
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        v.get_value('table_format').compression_codec == 'none')

  @pytest.mark.execute_serially
  def test_insert_part_key(self, vector):
    """Test that partition column exprs are cast to the correct type. See IMPALA-875."""
    self.run_test_case('QueryTest/insert_part_key', vector,
        multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)

  def test_escaped_partition_values(self, unique_database):
    """Test for special characters in partition values."""
    tbl = unique_database + ".tbl"
    self.execute_query(
        "create table {}(i int) partitioned by (p string) stored as parquet".format(tbl))
    # "\\'" is used to represent a single quote since the insert statement uses a single
    # quote on the partition value. "\\\\" is used for backslash since it gets escaped
    # again in parsing the insert statement.
    special_characters = "SpecialCharacters\x01\x02\x03\x04\x05\x06\x07\b\t\n\v\f\r" \
                         "\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B" \
                         "\x1C\x1D\x1E\x1F\"\x7F\\'%*/:=?\\\\{[]#^"
    part_value = "SpecialCharacters\x01\x02\x03\x04\x05\x06\x07\b\t\n\v\f\r" \
                 "\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B" \
                 "\x1C\x1D\x1E\x1F\"\x7F'%*/:=?\\{[]#^"
    part_dir = "p=SpecialCharacters%01%02%03%04%05%06%07%08%09%0A%0B%0C%0D%0E%0F%" \
                "10%11%12%13%14%15%16%17%18%19%1A%1B%1C%1D%1E%1F%22%7F%27%25%2A" \
                "%2F%3A%3D%3F%5C%7B%5B%5D%23%5E"
    res = self.execute_query(
        "insert into {} partition(p='{}') values (0)".format(tbl, special_characters))
    assert part_dir in res.runtime_profile
    assert 'NumModifiedRows: 1\n' in res.runtime_profile
    res = self.client.execute("select p from {}".format(tbl))
    assert res.data[0] == part_value
    res = self.execute_query("show partitions " + tbl)
    # There is a "\t" in the partition value making splitting of the result line
    # difficult, hence we only verify that the value is present in string
    # representation of the whole row.
    assert part_value in res.data[0]
    assert part_dir in res.data[0]

  def test_escaped_partition_values_iceberg(self, unique_database):
    """Test for special characters in partition values for iceberg tables"""
    tbl = unique_database + ".tbl"
    self.execute_query("create table {} (id int, p string) partitioned by"
                       " spec (identity(p)) stored by iceberg".format(tbl))
    # "\\'" is used to represent a single quote since the insert statement uses a single
    # quote on the partition value. "\\\\" is used for backslash since it gets escaped
    # again in parsing the insert statement.
    special_characters = "SpecialCharacters\x01\x02\x03\x04\x05\x06\x07\b\t\n\v\f\r" \
                         "\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B" \
                         "\x1C\x1D\x1E\x1F\"\x7F\\'%*/:=?\\\\{[]#^"
    part_value = "SpecialCharacters\x01\x02\x03\x04\x05\x06\x07\b\t\n\v\f\r" \
                 "\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B" \
                 "\x1C\x1D\x1E\x1F\"\x7F'%*/:=?\\{[]#^"
    part_dir = "p=SpecialCharacters%01%02%03%04%05%06%07%08%09%0A%0B%0C%0D%0E%0F%" \
               "10%11%12%13%14%15%16%17%18%19%1A%1B%1C%1D%1E%1F%22%7F%27%25*" \
               "%2F%3A%3D%3F%5C%7B%5B%5D%23%5E"
    show_part_value = "SpecialCharacters\\u0001\\u0002\\u0003\\u0004\\u0005\\u0006" \
                      "\\u0007\\b\\t\\n\\u000B\\f\\r\\u000E\\u000F\\u0010" \
                      "\\u0011\\u0012\\u0013\\u0014\\u0015\\u0016\\u0017" \
                      "\\u0018\\u0019\\u001A\\u001B\\u001C\\u001D\\u001E" \
                     "\\u001F\\\"\\u007F\'%*\\/:=?\\\\{[]#^"
    res = self.execute_query(
        "insert into {} values (0, '{}')".format(tbl, special_characters))
    assert part_dir in res.runtime_profile
    assert 'NumModifiedRows: 1\n' in res.runtime_profile
    res = self.client.execute("select p from {}".format(tbl))
    assert res.data[0] == part_value
    res = self.execute_query("show partitions " + tbl)
    # There is a "\t" in the partition value making splitting of the result line
    # difficult, hence we only verify that the value is present in string
    # representation of the whole row.
    assert show_part_value in res.data[0]


class TestInsertNullQueries(ImpalaTestSuite):
  @classmethod
  def add_test_dimensions(cls):
    super(TestInsertNullQueries, cls).add_test_dimensions()
    # Fix the exec_option vector to have a single value. This is needed should we decide
    # to run the insert tests in parallel (otherwise there will be two tests inserting
    # into the same table at the same time for the same file format).
    # TODO: When we do decide to run these tests in parallel we could create unique temp
    # tables for each test case to resolve the concurrency problems.
    cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
        cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0]))

    # These tests only make sense for inserting into a text table with special
    # logic to handle all the possible ways NULL needs to be written as ascii
    cls.ImpalaTestMatrix.add_constraint(lambda v:
          (v.get_value('table_format').file_format == 'text'
           and v.get_value('table_format').compression_codec == 'none'))

  @classmethod
  def setup_class(cls):
    super(TestInsertNullQueries, cls).setup_class()

  def test_insert_null(self, vector, unique_database):
    self.run_test_case('QueryTest/insert_null', vector, unique_database,
        test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
        .get_db_name_from_format(vector.get_value('table_format'))})


class TestInsertFileExtension(ImpalaTestSuite):
  """Tests that files written to a table have the correct file extension. Asserts that
  Parquet files end with .parq and text files end with .txt."""

  @classmethod
  def add_test_dimensions(cls):
    super(TestInsertFileExtension, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        is_supported_insert_format(v.get_value('table_format')))
    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())

  @classmethod
  def setup_class(cls):
    super(TestInsertFileExtension, cls).setup_class()

  def test_file_extension(self, vector, unique_database):
    table_format = vector.get_value('table_format').file_format
    if table_format == 'parquet':
      file_extension = '.parq'
      stored_as_format = 'parquet'
    else:
      file_extension = '.txt'
      stored_as_format = 'textfile'
    table_name = "{0}_table".format(table_format)
    ctas_query = "create table {0}.{1} stored as {2} as select 1".format(
        unique_database, table_name, stored_as_format)
    self.execute_query_expect_success(self.client, ctas_query)
    for path in self.filesystem_client.ls("test-warehouse/{0}.db/{1}".format(
        unique_database, table_name)):
      if not path.startswith('_'): assert path.endswith(file_extension)


class TestInsertHdfsWriterLimit(ImpalaTestSuite):
  """Test to make sure writer fragment instances are distributed evenly when using max
      hdfs_writers query option."""
  @classmethod
  def default_test_protocol(cls):
    return HS2

  @classmethod
  def add_test_dimensions(cls):
    super(TestInsertHdfsWriterLimit, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        (v.get_value('table_format').file_format == 'parquet'))

  @UniqueDatabase.parametrize(sync_ddl=True)
  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
  def test_insert_writer_limit(self, unique_database):
    # Root internal (non-leaf) fragment.
    query = "create table {0}.test1 as select int_col from " \
            "functional_parquet.alltypes".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=0,
                                           expected_num_instances_per_host=[1, 2, 2])
    # Root coordinator fragment.
    query = "create table {0}.test2 as select int_col from " \
            "functional_parquet.alltypes limit 100000".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=0,
                                           expected_num_instances_per_host=[1, 1, 2])
    # Root scan fragment. Instance count within limit.
    query = "create table {0}.test3 as select int_col from " \
            "functional_parquet.alltypes".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=4, mt_dop=0,
                                           expected_num_instances_per_host=[1, 1, 1])

  @UniqueDatabase.parametrize(sync_ddl=True)
  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
  def test_mt_dop_writer_limit(self, unique_database):
    # Root internal (non-leaf) fragment.
    query = "create table {0}.test1 as select int_col from " \
            "functional_parquet.alltypes".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=11, mt_dop=10,
                                           expected_num_instances_per_host=[11, 12, 12])
    # Root coordinator fragment.
    query = "create table {0}.test2 as select int_col from " \
            "functional_parquet.alltypes limit 100000".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=10,
                                           expected_num_instances_per_host=[8, 8, 9])
    # Root scan fragment. Instance count within limit.
    query = "create table {0}.test3 as select int_col from " \
            "functional_parquet.alltypes".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=30, mt_dop=10,
                                           expected_num_instances_per_host=[8, 8, 8])

  @UniqueDatabase.parametrize(sync_ddl=True)
  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
  def test_processing_cost_writer_limit(self, unique_database):
    """Test both scenario of partitioned and unpartitioned insert.
    All of the unpartitioned testscases will result in one instance writer because the
    output volume is less than 256MB. Partitoned insert will result in 1 writer per node
    unless max_fs_writers is set lower than num nodes."""
    # Root internal (non-leaf) fragment.
    query = "create table {0}.test1 as select int_col from " \
            "functional_parquet.alltypes".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=2,
                                           expected_num_instances_per_host=[1, 1, 2],
                                           processing_cost_min_threads=1)
    # Root coordinator fragment.
    query = "create table {0}.test2 as select int_col from " \
            "functional_parquet.alltypes limit 100000".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=2,
                                           expected_num_instances_per_host=[1, 1, 2],
                                           processing_cost_min_threads=1)
    # Root internal (non-leaf) fragment. Instance count within limit.
    query = "create table {0}.test3 as select int_col from " \
            "functional_parquet.alltypes".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=30,
                                           expected_num_instances_per_host=[1, 1, 2],
                                           processing_cost_min_threads=1)
    # Root internal (non-leaf) fragment. No max_fs_writers.
    # Scan node and writer sink should always be in separate fragment with cost-based
    # scaling.
    query = "create table {0}.test4 as select int_col from " \
            "functional_parquet.alltypes".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=0,
                                           expected_num_instances_per_host=[1, 1, 2],
                                           processing_cost_min_threads=1)
    # Partitioned insert with 6 distinct partition values.
    # Should create at least 1 writer per node.
    query = "create table {0}.test5 partitioned by (ss_store_sk) as " \
            "select ss_item_sk, ss_ticket_number, ss_store_sk " \
            "from tpcds_parquet.store_sales".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=0,
                                           expected_num_instances_per_host=[2, 2, 2],
                                           processing_cost_min_threads=1)
    # Partitioned insert can still be limited by max_fs_writers option.
    query = "create table {0}.test6 partitioned by (ss_store_sk) as " \
            "select ss_item_sk, ss_ticket_number, ss_store_sk " \
            "from tpcds_parquet.store_sales".format(unique_database)
    self.__run_insert_and_verify_instances(query, max_fs_writers=2,
                                           expected_num_instances_per_host=[1, 2, 2],
                                           processing_cost_min_threads=1)

  def __run_insert_and_verify_instances(self, query, max_fs_writers=0, mt_dop=0,
                                        processing_cost_min_threads=0,
                                        expected_num_instances_per_host=[]):
    self.client.set_configuration_option("max_fs_writers", max_fs_writers)
    self.client.set_configuration_option("mt_dop", mt_dop)
    if processing_cost_min_threads > 0:
      self.client.set_configuration_option("compute_processing_cost", "true")
      self.client.set_configuration_option("processing_cost_min_threads",
          processing_cost_min_threads)
    # Test depends on both planner and scheduler to see the same state of the cluster
    # having 3 executors, so to reduce flakiness we make sure all 3 executors are up
    # and running.
    self.impalad_test_service.wait_for_metric_value("cluster-membership.backends.total",
                                                    3)
    result = self.client.execute(query, fetch_exec_summary=True)
    assert 'HDFS WRITER' in result.exec_summary[0]['operator'], result.runtime_profile
    if (max_fs_writers > 0):
      num_writers = int(result.exec_summary[0]['num_instances'])
      assert (num_writers <= max_fs_writers), result.runtime_profile
    regex = r'Per Host Number of Fragment Instances' \
            r':.*?\((.*?)\).*?\((.*?)\).*?\((.*?)\).*?\n'
    matches = re.findall(regex, result.runtime_profile)
    assert len(matches) == 1 and len(matches[0]) == 3, result.runtime_profile
    num_instances_per_host = [int(i) for i in matches[0]]
    num_instances_per_host.sort()
    expected_num_instances_per_host.sort()
    assert num_instances_per_host == expected_num_instances_per_host, \
      result.runtime_profile
    self.client.clear_configuration()


class TestInsertNonPartitionedTable(ImpalaTestSuite):
  @classmethod
  def add_test_dimensions(cls):
    super(TestInsertNonPartitionedTable, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.add_constraint(lambda v:
          v.get_value('table_format').file_format == 'text'
          and v.get_value('table_format').compression_codec == 'none')
    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())

  @classmethod
  def setup_class(cls):
    super(TestInsertNonPartitionedTable, cls).setup_class()

  def test_insert_load_file_fail(self, vector, unique_database):
    """Tests metadata won't be corrupted after file metadata loading fails
    in non-partitioned tables."""
    table_name = '{0}.{1}'.format(unique_database, 'test_unpartition_tbl')
    self.client.execute('create table {0}(f0 int)'
        .format(table_name))
    self.client.execute('insert overwrite table {0} select 0'
        .format(table_name))
    result = self.client.execute("select f0 from {0}".format(table_name))
    assert result.data == ["0"]

    exec_options = vector.get_value('exec_option')
    exec_options['debug_action'] = 'catalogd_load_file_metadata_throw_exception'
    try:
      self.execute_query("insert overwrite table {0} select 1"
          .format(table_name), exec_options)
      assert False, "Expected query to fail."
    except Exception as e:
      assert "Failed to load metadata for table:" in str(e)

    exec_options['debug_action'] = ''
    self.execute_query("insert overwrite table {0} select 2"
        .format(table_name), exec_options)
    result = self.client.execute("select f0 from {0}".format(table_name))
    assert result.data == ["2"]

  @pytest.mark.execute_serially
  def test_parallel_checksum(self, vector, unique_database):
    """Test that checksum is calculated in parallel when inserting into a table
    with multiple files."""
    # Ensure source table is loaded into catalogd.
    self.execute_query("describe functional.alltypesaggmultifilesnopart")

    exec_options = vector.get_value('exec_option')
    exec_options['debug_action'] = 'catalogd_load_file_checksums_delay:SLEEP@3000'
    handle = self.execute_query_async("create table {0}.test as select * from "
        "functional.alltypesaggmultifilesnopart".format(unique_database), exec_options)

    # Test file has 4 files, so work can be distributed across 3 executors. This results
    # in writing 3 new files in 3 threads.
    catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd.service
    catalogd.wait_for_metric_value(
        "catalog-server.metadata.file.num-loading-threads", 3)
    catalogd.wait_for_metric_value(
        "catalog-server.metadata.table.num-loading-file-metadata", 1)
    is_finished = self.client.wait_for_finished_timeout(handle, 30)
    assert is_finished, "Query '{}' failed".format(handle.sql_stmt())

    # Stop the query and cleanup.
    self.close_query(handle)


class TestUnsafeImplicitCasts(ImpalaTestSuite):
  """Test to check 'allow_unsafe_casts' query-option behaviour on insert statements."""
  @classmethod
  def add_test_dimensions(cls):
    super(TestUnsafeImplicitCasts, cls).add_test_dimensions()
    cls.ImpalaTestMatrix.add_constraint(lambda v:
        (v.get_value('table_format').file_format == 'parquet'))

  def test_unsafe_insert(self, vector, unique_database):
    create_stmt = """create table {0}.unsafe_insert(tinyint_col tinyint,
      smallint_col smallint, int_col int, bigint_col bigint, float_col float,
      double_col double, decimal_col decimal, timestamp_col timestamp, date_col date,
      string_col string, varchar_col varchar(100), char_col char(100),
      bool_col boolean, binary_col binary)""".format(unique_database)
    create_partitioned_stmt = """create table {0}.unsafe_insert_partitioned(int_col int,
      tinyint_col tinyint) partitioned by(string_col string)""".format(unique_database)
    self.client.execute(create_stmt)
    self.client.execute(create_partitioned_stmt)
    vector.get_value('exec_option')['allow_unsafe_casts'] = "true"
    self.run_test_case('QueryTest/insert-unsafe', vector, unique_database)
